iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 19
1
Software Development

服務開發雜談系列 第 19

etcd Txn事務

  • 分享至 

  • xImage
  •  

etcd提供了事務實現.
可以實現多個Key的原子(Atomic)操作.
MySQL主要是透過lock或者是MVCC+Next-Key Lock這些機制來實現.
但etcd不同, 主要是基於CAS(Compare and Swap)方式(也是樂觀鎖的一種實現方式)來實現.
因為有ModRevision可以比較來做實現.

Txn提供了以下的接口

type Txn interface {
	If(cs ...Cmp) Txn

	Then(ops ...Op) Txn

	Else(ops ...Op) Txn

	Commit() (*TxnResponse, error)
}

對就只有這樣If-Then-Else, 最後就commit.

txn().If(cond1, cond2, ...).Then(op1, op2,...).Else(op1', op2',...).Commit)()

如果If的條件全為真, 則會執行Then中的操作, 且整個事務返回true;
反之執行Else內的操作.

Demo1:
演示一下用戶相互轉帳, 做事務交易的場景.
當userA轉100$給userB, 只要餘額不夠就取消交易.
餘額足夠就執行事務.

一開始userFrom有200$, userTo有10$, 一次轉帳100$過去.
依序轉帳3次

package main

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"log"
	"strconv"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()

	kv := clientv3.NewKV(cli)
	getUsersBalance(cancelCtx, kv, "user1", "user2")
	updateUserBalance(cancelCtx, kv, "user1", "user2", 100)
	getUsersBalance(cancelCtx, kv, "user1", "user2")
	canFunc()
	fmt.Println("Service Terminate")
}

func getUsersBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string) {
	getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
	if err != nil {
		return
	}
	fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
	toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
	userFromBalance, _ := toUint64(fromUserKv.Value)
	userToBalance, _ := toUint64(toUserKv.Value)

	fmt.Printf("userFrom:%d, userTo:%d\n", userFromBalance, userToBalance)
}

func updateUserBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string, amount uint64) (bool, error) {
	getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
	if err != nil {
		return false, err
	}
	fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
	toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
	userFromBalance, _ := toUint64(fromUserKv.Value)
	userToBalance, _ := toUint64(toUserKv.Value)

	if userFromBalance < uint64(amount) {
		fmt.Println("insufficient balance")
		return false, errors.New("insufficient balance")
	}
	fmt.Println("start transaction")
	txn := cli.Txn(ctx).
		If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
			clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
		Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
			clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))

	updateResp, err := txn.Commit()
	if err != nil {
		return false, err
	}
	fmt.Println("transaction success")
	return updateResp.Succeeded, nil
}

func toUint64(v []byte) (uint64, error) {
	str := string(v[:])
	return strconv.ParseUint(str, 10, 64)
}
func fromUInt64(v uint64) []byte {
	b := make([]byte, binary.MaxVarintLen64)
	return b[:binary.PutUvarint(b, v)]
}


// 執行3次的結果
/*
nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go 
userFrom:200, userTo:10
start transaction
transaction success
userFrom:100, userTo:110
Service Terminate

nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go 
userFrom:100, userTo:110
start transaction
transaction success
userFrom:0, userTo:210
Service Terminate

nathan@pc:~/go/src/ITHOME/demo/day19$ go run main.go 
userFrom:0, userTo:210
insufficient balance
userFrom:0, userTo:210
Service Terminate
*/

來講解一下.
etcd為了處理來自不同客戶端的併發請求, 還有達成事務隔離, 避免Race condition.
允許client在一次修改中批量執行多組操作命令, 就表示一組操作被綁成一個原子操作, 並且共享同一個revision.

MySQL為了實現Serializable(串行化執行可能會有衝突的事務, 就讓衝突的事務,依照順序進行阻塞等待), 往往使用上了悲觀鎖, 這導致效能非常低.
etcd這裡用的則是CAS, 沒有實際的Lock.

這裡先用一個事務, 原子操作同時讀取兩個KV

getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()

fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]

在準備第二個事務, 同時檢查兩個KV, 現在的ModRevision有沒有等於我們當時讀取到的ModRevision.
條件全成真, 則進行Then裡面的加減錢的操作.
這裡都是同一個原子操作的

txn := cli.Txn(ctx).
    If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
        clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
    Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
        clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))

updateResp, err := txn.Commit()

Demo2:
演示一下用戶相互轉帳,卻有兩個線程併發操作同一筆資料.
一開始userFrom有100$, userTo有10$, 一次轉帳100$過去.
如果這事務隔離不能防Non-Repeatable Read的話, 會變成userFrom 0$, 但userTo會有210$. (真好送錢了)

package main

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"log"
	"strconv"
	"sync"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	dialTimeout      = 10 * time.Second
	requestTimeout   = 5 * time.Second
	keepAliveTimeout = 5 * time.Second
)

func main() {
	cancelCtx, canFunc := context.WithCancel(context.Background())

	var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
	cli, err := clientv3.New(clientv3.Config{
		Context:     cancelCtx,
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer cli.Close()

	var wg sync.WaitGroup
	wg.Add(2)
	kv := clientv3.NewKV(cli)
	getUsersBalance(cancelCtx, kv, "user1", "user2")
	go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, true)
	time.Sleep(10 * time.Millisecond)
	go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, false)
	wg.Wait()
	getUsersBalance(cancelCtx, kv, "user1", "user2")

	canFunc()
	fmt.Println("Service Terminate")
}

func getUsersBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string) {
	getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
	if err != nil {
		return
	}
	fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
	toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
	userFromBalance, _ := toUint64(fromUserKv.Value)
	userToBalance, _ := toUint64(toUserKv.Value)

	fmt.Printf("userFrom:%d, userTo:%d\n", userFromBalance, userToBalance)
}

func updateUserBalance(ctx context.Context, cli clientv3.KV, userFrom, userTo string, amount uint64, wg *sync.WaitGroup, isFirst bool) (bool, error) {
	defer wg.Done()
	getResp, err := cli.Txn(ctx).Then(clientv3.OpGet(userFrom), clientv3.OpGet(userTo)).Commit()
	if err != nil {
		return false, err
	}
	fromUserKv := getResp.Responses[0].GetResponseRange().Kvs[0]
	toUserKv := getResp.Responses[1].GetResponseRange().Kvs[0]
	userFromBalance, _ := toUint64(fromUserKv.Value)
	userToBalance, _ := toUint64(toUserKv.Value)

	if userFromBalance < uint64(amount) {
		fmt.Println("insufficient balance")
		return false, errors.New("insufficient balance")
	}
	fmt.Println("start transaction")
	if isFirst {
		time.Sleep(2 * time.Second)
	}
	txn := cli.Txn(ctx).
		If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
			clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
		Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
			clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))

	updateResp, err := txn.Commit()
	if err != nil {
		return false, err
	}
	if updateResp.Succeeded == false {
		fmt.Println("transaction fail")
		return false, errors.New("transaction fail")
	}
	fmt.Println("transaction success")

	return updateResp.Succeeded, nil
}

func toUint64(v []byte) (uint64, error) {
	str := string(v[:])
	return strconv.ParseUint(str, 10, 64)
}
func fromUInt64(v uint64) []byte {
	b := make([]byte, binary.MaxVarintLen64)
	return b[:binary.PutUvarint(b, v)]
}
/*
userFrom:200, userTo:1010
start transaction
start transaction
transaction success
transaction fail
userFrom:100, userTo:1110
Service Terminate
*/

從結果來看, 可以發現第一個提出事務交易的線程最後是提交失敗的.

這裡刻意用兩個線程, 線程1先執行了updateUserBalance, 線程2後面才執行.

go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, true)
time.Sleep(10 * time.Millisecond)
go updateUserBalance(cancelCtx, kv, "user1", "user2", 100, &wg, false)

第一個線程取得兩個KV當下的revision,就睡個2秒去了XD
然後第二個線程取得兩個KV當下的revision, 並且事務檢查成功, 提交了, 扣了錢.
此時這兩個kv的ModRevision就被異動了.
第一個線程醒來後, 開始txn的檢查, 在提交時, 就會回傳提交失敗了.
這樣不會白白送錢給人了, 但當然client side自己要handle這類的錯誤就是.

if isFirst {
    time.Sleep(2 * time.Second)
}
txn := cli.Txn(ctx).
    If(clientv3.Compare(clientv3.ModRevision(userFrom), "=", fromUserKv.ModRevision),
        clientv3.Compare(clientv3.ModRevision(userTo), "=", toUserKv.ModRevision)).
    Then(clientv3.OpPut(userFrom, strconv.FormatInt(int64(userFromBalance-amount), 10)),
        clientv3.OpPut(userTo, strconv.FormatInt(int64(userToBalance+amount), 10)))

updateResp, err := txn.Commit()
if err != nil {
    return false, err
}
if updateResp.Succeeded == false {
    fmt.Println("transaction fail")
    return false, errors.New("transaction fail")
}
fmt.Println("transaction success")

上一篇
etcd Watch監看
下一篇
etcd 服務註冊與發現中心
系列文
服務開發雜談33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言